/*
 * @Author: lokei
 * @Date: 2022-09-21 18:43:12
 * @LastEditors: lokei
 * @LastEditTime: 2023-10-24 20:51:00
 * @Description: 
 */
package cn.lokei.task.handler;

import java.util.List;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import cn.lokei.task.entity.printer.Printer;
import cn.lokei.task.entity.redis.RedisStream;
import cn.lokei.task.handler.douyin.SyncOrderHandler;
import cn.lokei.task.handler.message.SendMessageHandler;
import cn.lokei.task.service.iot.cabinet.CabinetService;
import cn.lokei.task.service.print.PrintService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class RedisStreamListenerMessage implements StreamListener<String, ObjectRecord<String, String>> {

    /**
     * redis Stream 工具类
     */
    @Autowired
    private RedisStream redisStream;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    PrintService printService;

    @Resource
    CabinetService cabinetService;

    @Autowired
    SendMessageHandler sendMessageHandler;

    @Autowired
    SyncOrderHandler syncOrderHandler;

    /**
     * 消息监听
     *
     * @param message e
     */
    @SneakyThrows
    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        log.info("接受redisStream: {},监听到消息：{}", message.getStream(), message);
        String message_topic = message.getStream();
        if (message_topic != null) {
            StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
            switch (message_topic) {
                case "smt_print":
                    List<MapRecord<String, String, String>> result = streamOperations.range("smt_print",
                            Range.closed(message.getId().toString(), message.getId().toString()));
                    if (result != null && result.size() > 0) {
                        // 开始和结束都是同一个ID，所以结果只有一条
                        MapRecord<String, String, String> record = result.get(0);
                        String printer_str = record.getValue().get("printer");
                        JSONObject jsonObject = JSON.parseObject(printer_str);
                        Printer printer = new Printer();
                        printer.setType(jsonObject.getString("type"));
                        printer.setUser(jsonObject.getString("user"));
                        printer.setUkey(jsonObject.getString("ukey"));
                        printer.setSn(jsonObject.getString("sn"));
                        Integer print_number = 1;
                        if (jsonObject.getString("print_number") != null) {
                            print_number = Integer.valueOf(jsonObject.getString("print_number"));
                        }
                        printer.setPrint_number(print_number);
                        printService.print(printer, record.getValue().get("content"));

                        // 消费完成后确认消费（ACK）
                        redisStream.ack(String.valueOf(message.getStream()), "smt_print_grp",
                                String.valueOf(message.getId()));

                        streamOperations.delete(record);
                    }
                    break;
                case "send_message":
                    List<MapRecord<String, String, String>> result_send_message = streamOperations.range("send_message",
                            Range.closed(message.getId().toString(), message.getId().toString()));
                    if (result_send_message != null && result_send_message.size() > 0) {
                        // 开始和结束都是同一个ID，所以结果只有一条
                        MapRecord<String, String, String> record = result_send_message.get(0);
                        sendMessageHandler.handler(record);

                        // 消费完成后确认消费（ACK）
                        redisStream.ack(String.valueOf(message.getStream()), "send_message_grp",
                                String.valueOf(message.getId()));

                        streamOperations.delete(record);
                    }
                    break;
                case "open_cabinet":
                    List<MapRecord<String, String, String>> result_open_cabinet = streamOperations.range("open_cabinet",
                            Range.closed(message.getId().toString(), message.getId().toString()));
                    if (result_open_cabinet != null && result_open_cabinet.size() > 0) {
                        // 开始和结束都是同一个ID，所以结果只有一条
                        MapRecord<String, String, String> record = result_open_cabinet.get(0);
                        cabinetService.openBox(record);

                        // 消费完成后确认消费（ACK）
                        redisStream.ack(String.valueOf(message.getStream()), "open_cabinet_grp",
                                String.valueOf(message.getId()));

                        streamOperations.delete(record);
                    }
                    break;
                case "douyin_order_sync":
                    List<MapRecord<String, String, String>> result_douyin_order_sync = streamOperations.range("douyin_order_sync",
                            Range.closed(message.getId().toString(), message.getId().toString()));
                    if (result_douyin_order_sync != null && result_douyin_order_sync.size() > 0) {
                        // 开始和结束都是同一个ID，所以结果只有一条
                        MapRecord<String, String, String> record = result_douyin_order_sync.get(0);
                        syncOrderHandler.handler(record);

                        // 消费完成后确认消费（ACK）
                        redisStream.ack(String.valueOf(message.getStream()), "douyin_order_sync_grp",
                                String.valueOf(message.getId()));

                        streamOperations.delete(record);
                    }
                    break;
                default:
                    break;
            }
        }
    }

}
